{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Kubeflow Pipeline: Exporting BQML Models to Online AI Platform Prediction\n",
    "\n",
    "The notebook \"Tutorial: Exporting BQML Models to Online AI Platform Prediction\" walks through the concepts of training model in BQML and exporting the model to be served in AI Platform.  This notebook takes that process and implements a Kubeflow Pipeline to automate the steps."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import kfp\n",
    "import kfp.components as comp\n",
    "import random\n",
    "import os"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#Common Parameters\n",
    "PROJECT_ID=[] #enter your project name\n",
    "KFPHOST=[] #enter your KFP hostname\n",
    "\n",
    "#Parameters for BQML\n",
    "DATASET=[]     #name of dataset to create or use if exists\n",
    "VIEW= []       #name of view to be created for BQML create model\n",
    "MODEL=[]       #model name for both BQML and AI Platform\n",
    "ALGO=[]        #e.g. 'linear_reg'\n",
    "\n",
    "#Parameters for AI Platform Prediction\n",
    "REGION=[]               #e.g. 'us-central1'\n",
    "MODEL_VERSION=[]        #e.g. 'v1'\n",
    "RUNTIME_VERSION='1.15'  #do not change\n",
    "PYTHON_VERSION='3.7'    #do not change\n",
    "MODEL_BUCKET='gs://{0}-{1}'.format(PROJECT_ID,str(random.randrange(1000,10000)))\n",
    "MODEL_PATH=os.path.join(MODEL_BUCKET,'bqml/model/export/',MODEL,MODEL_VERSION)\n",
    "\n",
    "#Parameters for KF Pipeline\n",
    "KFP_EXPERIMENT_NAME='Natality Pipeline'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Creating KFP Ops\n",
    "Each step in a Kubeflow Pipeline is a container operation.  If the operation you would like to accomplish is already available in the components library in the KFP repo, the process of creating an op is simply loading it.  AI Platform provides such an op for model deployment."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "mlengine_deploy_op = comp.load_component_from_url(\n",
    "    'https://raw.githubusercontent.com/kubeflow/pipelines/01a23ae8672d3b18e88adf3036071496aca3552d/components/gcp/ml_engine/deploy/component.yaml')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "If you cannot find the op in the component library to support your need, you will need to create that container.  An easy way to do this is via kfp.components.func_to_container_op.  In our case, we would like to execute a number of bq commands.  To do this, we will create a Python function as a general command executor, then convert the function to a container op via kfp.components.func_to_container_op."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def gcp_command_func(project_id: str, command_string: str) -> str:\n",
    "    import subprocess\n",
    "    config_string=\"gcloud config set project {}\".format(project_id)\n",
    "    config=subprocess.run(config_string, shell=True, check=True, stdout=subprocess.PIPE, universal_newlines=True)\n",
    "    print(\"Running command: {}\".format(command_string))\n",
    "    response=subprocess.run(command_string, shell=True, check=True, stdout=subprocess.PIPE, universal_newlines=True)\n",
    "    print(\"Command response: {}\".format(response.stdout))\n",
    "    return project_id\n",
    "\n",
    "gcp_command_op=comp.func_to_container_op(func=gcp_command_func, base_image=\"google/cloud-sdk:latest\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Prepare bq commands\n",
    "There are four bq operations in our pipeline:\n",
    "1. Create Dataset\n",
    "2. Create View\n",
    "3. Create Model\n",
    "4. Export Model\n",
    "\n",
    "The following will create the commands that will be executed by the gcp_command_op created above."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def make_bucket(bucket):\n",
    "    return \"gsutil ls {0} || gsutil mb {0}\".format(bucket)\n",
    "\n",
    "def create_dataset(dataset):\n",
    "    return \"bq show {0} || bq mk {0}\".format(dataset)\n",
    "\n",
    "def create_view(dataset, view):\n",
    "    query = \"\"\"\n",
    "        SELECT\n",
    "          weight_pounds,\n",
    "          is_male,\n",
    "          gestation_weeks,\n",
    "          mother_age,\n",
    "          CASE\n",
    "            WHEN MOD(CAST(ROUND(weight_pounds*100) as int64), 10) < 8 THEN \"training\"\n",
    "            WHEN MOD(CAST(ROUND(weight_pounds*100) as int64), 10) = 8 THEN \"evaluation\"\n",
    "            WHEN MOD(CAST(ROUND(weight_pounds*100) as int64), 10) = 9 THEN \"prediction\"\n",
    "          END AS datasplit\n",
    "        FROM\n",
    "          `bigquery-public-data.samples.natality`\n",
    "        WHERE\n",
    "          weight_pounds IS NOT NULL\n",
    "    \"\"\".format(dataset, view)\n",
    "    return \"bq show {1}.{2} || bq mk --use_legacy_sql=false --view '{0}' {1}.{2}\".format(query, dataset, view)\n",
    "\n",
    "def create_model(dataset, view, model, algo):\n",
    "    query = \"\"\"\n",
    "        CREATE OR REPLACE MODEL\n",
    "          `{0}.{2}`\n",
    "        OPTIONS\n",
    "          (model_type=\"{3}\",\n",
    "            input_label_cols=[\"weight_pounds\"]) AS\n",
    "                SELECT\n",
    "                  weight_pounds,\n",
    "                  is_male,\n",
    "                  gestation_weeks,\n",
    "                  mother_age\n",
    "                FROM\n",
    "                  {0}.{1}\n",
    "                WHERE\n",
    "                  datasplit = \"training\"\n",
    "    \"\"\".format(dataset, view, model, algo)\n",
    "    return \"bq show {1}.{3} || bq query --use_legacy_sql=false '{0}'\".format(query, dataset, view, model, algo)\n",
    "    \n",
    "def export_model(dataset, model, export_path):\n",
    "    return \"bq extract -m {0}.{1} {2}\".format(dataset, model, export_path)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Ops in a Kubeflow Pipeline is organized in to a Directed Acyclic Graph (DAG).  Order of operation of the ops is controlled by their dependencies on other ops.  Ops with no dependencies or dependencies that are satisfied will run.  Dependencies can be naturally created when one op's input is dependent on another op's output.  If your ops do not have that kind of dependency, you can still manually enforce it with this pattern:\n",
    "\n",
    "`current_op.after(previous_op) where current_op depends on previous_op`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "@kfp.dsl.pipeline(\n",
    "    name='BQML Model Export to AI Platform Prediction',\n",
    "    description='This pipeline trains a BQML model and exports to GCS, then loads into AI Platform Prediction.'\n",
    ")\n",
    "\n",
    "def bqml_to_caip(project_id = PROJECT_ID,\n",
    "                 bucket=MODEL_BUCKET,\n",
    "                 model_path=MODEL_PATH,\n",
    "                 dataset=DATASET,\n",
    "                 view=VIEW,\n",
    "                 model=MODEL,\n",
    "                 model_version=MODEL_VERSION,\n",
    "                 algo=ALGO,\n",
    "                 export_path=MODEL_PATH,\n",
    "                 runtime_version=RUNTIME_VERSION,\n",
    "                 python_version=PYTHON_VERSION,\n",
    "                 region=REGION\n",
    "):\n",
    "    \n",
    "    #Prepare commands for gcp_command_op\n",
    "    make_bucket_command=make_bucket(bucket)\n",
    "    create_dataset_command=create_dataset(dataset)\n",
    "    create_view_command=create_view(dataset, view)\n",
    "    create_model_command=create_model(dataset, view, model, algo)\n",
    "    export_model_command=export_model(dataset, model, export_path)\n",
    "    \n",
    "    #Create ops in pipeline\n",
    "    make_bucket_op=gcp_command_op(project_id=project_id,\n",
    "                                  command_string=make_bucket_command)\n",
    "    create_dataset_op=gcp_command_op(project_id=project_id,\n",
    "                                     command_string=create_dataset_command)\n",
    "    create_view_op=gcp_command_op(project_id=project_id,\n",
    "                                  command_string=create_view_command)\n",
    "    create_model_op=gcp_command_op(project_id=project_id,\n",
    "                                   command_string=create_model_command)\n",
    "    export_model_op=gcp_command_op(project_id=project_id,\n",
    "                                   command_string=export_model_command)\n",
    "    model_deploy_op=mlengine_deploy_op(model_uri=export_path,\n",
    "                                       project_id=project_id,\n",
    "                                       model_id=model,\n",
    "                                       version_id=model_version,\n",
    "                                       runtime_version=runtime_version,\n",
    "                                       python_version=python_version)\n",
    "    \n",
    "    #Set op dependencies\n",
    "    create_dataset_op.after(make_bucket_op)\n",
    "    create_view_op.after(create_dataset_op)\n",
    "    create_model_op.after(create_view_op)\n",
    "    export_model_op.after(create_model_op)\n",
    "    model_deploy_op.after(export_model_op)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Compile and Run the Pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pipeline_func = bqml_to_caip\n",
    "pipeline_filename = pipeline_func.__name__ + '.zip'\n",
    "\n",
    "import kfp.compiler as compiler\n",
    "compiler.Compiler().compile(pipeline_func, pipeline_filename)\n",
    "\n",
    "arguments = {}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create an experiment name.  If this experiment already exist, this step will set the experiment name to the specified experiment."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "client = kfp.Client(KFPHOST)\n",
    "experiment = client.create_experiment(KFP_EXPERIMENT_NAME)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Submit a Pipeline run under the experiment name above."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#Submit a pipeline run\n",
    "run_name = pipeline_func.__name__ + ' run'\n",
    "run_result = client.run_pipeline(\n",
    "    experiment_id=experiment.id, \n",
    "    job_name=run_name, \n",
    "    pipeline_package_path=pipeline_filename, \n",
    "    params=arguments)"
   ]
  }
 ],
 "metadata": {
  "environment": {
   "name": "tf-cpu.1-15.m48",
   "type": "gcloud",
   "uri": "gcr.io/deeplearning-platform-release/tf-cpu.1-15:m48"
  },
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.4"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
